// bdlcc_multipriorityqueue.h                                         -*-C++-*-
#ifndef INCLUDED_BDLCC_MULTIPRIORITYQUEUE
#define INCLUDED_BDLCC_MULTIPRIORITYQUEUE

#include <bsls_ident.h>
BSLS_IDENT("$Id: $")

//@PURPOSE: Provide a thread-enabled parameterized multi-priority queue.
//
//@CLASSES:
//  bdlcc::MultipriorityQueue: thread-enabled, multi-priority queue
//
//@SEE_ALSO:
//
//@DESCRIPTION: This component provides a thread-enabled mechanism,
// `bdlcc::MultipriorityQueue`, implementing a special-purpose priority queue
// container of items of parameterized `TYPE`.  Each item has a priority which,
// for efficiency of implementation, is limited to a relatively small number
// `N` of contiguous integers `[ 0 .. N - 1 ]`, with `N` indicated at
// construction, and 0 being the most urgent priority.  This queue also takes
// an optional allocator, supplied at construction.  Once configured, these
// instance parameters remain unchanged for the life of each multi-priority
// queue.
//
///Thread-Enabled Idioms in the `bdlcc::MultipriorityQueue` Interface
///------------------------------------------------------------------
// The thread-enabled `bdlcc::MultipriorityQueue` is, in many regards, similar
// to a value-semantic type in that there is an obvious abstract notion of
// "value" that can be described in terms of salient attributes, which for this
// type is a sequence of priority/element pairs, constrained to be in
// increasing order of priority.  There are, however, several differences in
// method behavior and signature that arise due to the thread-enabled nature of
// the queue and its anticipated usage pattern.
//
// For example, if a queue object is empty, `popFront` will block indefinitely
// until an element is added to the queue.  Also, since dynamic instance
// information, such as the number of elements currently in a queue, can be
// out-of-date by the time it is returned, some manipulators (e.g.,
// `tryPopFront`) are deliberately combined with an accessor operation (e.g.,
// `isEmpty`) in order to guarantee proper behavior.
//
// Finally, note that although the parameterized `TYPE` is expected to at least
// support copy construction and assignment, the
// `bdlcc::MultipriorityQueue<TYPE>` type currently does not support any
// value-semantic operations, since different queues could have different
// numbers of priorities, making comparison, assignment and copy construction
// awkward.
//
///Possible Future Enhancements
///----------------------------
// In addition to `popFront` and `tryPopFront`, a `bdlcc::MultipriorityQueue`
// may some day also provide a `timedPopFront` method.  This method would block
// until it is able to complete successfully or until the specified time limit
// expires.
//
///WARNING: Synchronization Required on Destruction
///------------------------------------------------
// The behavior for the destructor is undefined unless all access or
// modification of the object is completed prior to its destruction.  Some form
// of synchronization, external to the component, is required to ensure the
// precondition on the destructor is met.  For example, if two (or more)
// threads are manipulating a queue, it is *not* safe to anticipate the number
// of elements added to the queue, and destroy that queue immediately after the
// last element is popped (without additional synchronization) because one of
// the corresponding push functions may not have completed (push may, for
// instance, signal waiting threads after the element is considered added to
// the queue).
//
///Usage
///-----
// This section illustrates intended use of this component.
//
///Example 1: Simple Thread Pool
///- - - - - - - - - - - - - - -
// This example demonstrates how we might use a `bdlcc::MultipriorityQueue` to
// communicate between a single "producer" thread and multiple "consumer"
// threads.  The "producer" pushes work requests of varying priority onto the
// queue, and each "consumer" iteratively takes the highest priority work
// request from the queue and services it.
//
// We begin our example with some utility classes that define a simple "work
// item":
// ```
// enum {
//     k_MAX_CONSUMER_THREADS = 10
// };
//
// struct MyWorkData {
//     int d_i;        // input to work to be done
//
//     // Work data...
// };
//
// struct MyWorkRequest {
//     enum RequestType {
//         e_WORK = 1,
//         e_STOP = 2
//     };
//
//     RequestType d_type;
//     MyWorkData  d_data;
//
//     // Work data...
// };
// ```
// Next, we provide a simple function to service an individual work item, and a
// function to get a work item.  The details are unimportant for this example:
// ```
// void myDoWork(MyWorkData& data)
// {
//     // Do work...
//     (void)data;
// }
//
// int getWorkData(MyWorkData *result)
// {
//     static int count = 0;
//     result->d_i = rand();   // Only one thread runs this routine, so it
//                             // does not matter that 'rand()' is not
//                             // thread-safe, or that 'count' is 'static'.
//
//     return ++count >= 100;
// }
// ```
// The `myConsumer` function (below) will pop elements off the queue in
// priority order and process them.  As discussed above, note that the call to
// `queue->popFront(&item)` will block until there is an element available on
// the queue.  This function will be executed in multiple threads, so that each
// thread waits in `queue->popFront()`; `bdlcc::MultipriorityQueue` guarantees
// that each thread gets a unique element from the queue:
// ```
// void myConsumer(bdlcc::MultipriorityQueue<MyWorkRequest> *queue)
// {
//     MyWorkRequest item;
//     while (1) {
//
//         // The 'popFront' function will wait for a 'MyWorkRequest' until
//         // one is available.
//
//         queue->popFront(&item);
//
//         if (MyWorkRequest::e_STOP == item.d_type) {
//             break;
//         }
//
//         myDoWork(item.d_data);
//     }
// }
// ```
// The `myConsumerThread` function below is a callback for `bslmt::ThreadUtil`,
// which requires a "C" signature.  `bslmt::ThreadUtil::create()` expects a
// pointer to this function, and provides that function pointer to the
// newly-created thread.  The new thread then executes this function.
//
// Since `bslmt::ThreadUtil::create()` uses the familiar "C" convention of
// passing a `void` pointer, our function simply casts that pointer to our
// required type (`bdlcc::MultipriorityQueue<MyWorkRequest> *`), and then
// delegates to the queue-specific function `myConsumer` (above):
// ```
// extern "C" void *myConsumerThread(void *queuePtr)
// {
//     myConsumer ((bdlcc::MultipriorityQueue<MyWorkRequest>*) queuePtr);
//     return queuePtr;
// }
// ```
// In this simple example, the `myProducer` function (below) serves multiple
// roles: it creates the `bdlcc::MultipriorityQueue`, starts the consumer
// threads, and then produces and queues work items.  When work requests are
// exhausted, this function queues one `e_STOP` item for each consumer thread.
//
// When each consumer thread reads a `e_STOP`, it terminates its
// thread-handling function.  Note that, although the producer cannot control
// which thread pops a particular work item, it can rely on the knowledge that
// each consumer thread will read a single `e_STOP` item and then terminate.
//
// Finally, the `myProducer` function "joins" each consumer thread, which
// ensures that the thread itself will terminate correctly (see the
// `bslmt_threadutil` component-level documentation for details):
// ```
// void myProducer()
// {
//     enum {
//         k_NUM_PRIORITIES = 8,
//         k_NUM_THREADS    = 8
//     };
//
//     MyWorkRequest item;
//     MyWorkData    workData;
//
//     // Create multi-priority queue with specified number of priorities.
//
//     bdlcc::MultipriorityQueue<MyWorkRequest> queue(k_NUM_PRIORITIES);
//
//     // Start the specified number of threads.
//
//     assert(0 < k_NUM_THREADS
//         && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
//     bslmt::ThreadUtil::Handle consumerHandles[k_MAX_CONSUMER_THREADS];
//
//     for (int i = 0; i < k_NUM_THREADS; ++i) {
//         bslmt::ThreadUtil::create(&consumerHandles[i],
//                                  myConsumerThread,
//                                  &queue);
//     }
//
//     // Load work data into work requests and push them onto the queue with
//     // varying priority until all work data has been exhausted.
//
//     int count = 0;                          // used to generate priorities
//
//     while (!getWorkData(&workData)) {       // see declaration (above)
//         item.d_type = MyWorkRequest::e_WORK;
//         item.d_data = workData;
//         queue.pushBack(item, count % k_NUM_PRIORITIES);  // mixed
//                                                          // priorities
//         ++count;
//     }
//
//     // Load as many stop requests as there are active consumer threads.
//
//     for (int i = 0; i < k_NUM_THREADS; ++i) {
//         item.d_type = MyWorkRequest::e_STOP;
//         queue.pushBack(item, k_NUM_PRIORITIES - 1);  // lowest priority
//     }
//
//     // Join all of the consumer threads back with the main thread.
//
//     for (int i = 0; i < k_NUM_THREADS; ++i) {
//         bslmt::ThreadUtil::join(consumerHandles[i]);
//     }
// }
// ```
//
///Example 2: Multi-Threaded Observer
/// - - - - - - - - - - - - - - - - -
// The previous example shows a simple mechanism for distributing work requests
// over multiple threads.  This approach works well for large tasks that can be
// decomposed into discrete, independent tasks that can benefit from parallel
// execution.  Note also that the various threads are synchronized only at the
// end of execution, when the producer "joins" the various consumer threads.
//
// The simple strategy used in the first example works well for tasks that
// share no state, and are completely independent of one another.  For
// instance, a web server might use a similar strategy to distribute `http`
// requests across multiple worker threads.
//
// In more complicated examples, it is often necessary or desirable to
// synchronize the separate tasks during execution.  The second example below
// shows a single "Observer" mechanism that receives event notification from
// the various worker threads.
//
// We first create a simple `MyEvent` data type.  Worker threads will use this
// type to report information about their work.  In our example, we will report
// the "worker Id", the event number, and some arbitrary text.
//
// As with the previous example, class `MyEvent` also contains an `EventType`,
// an enumeration that indicates whether the worker has completed all work.
// The "Observer" will use this enumerated value to note when a worker thread
// has completed its work:
// ```
// enum {
//     k_MAX_CONSUMER_THREADS = 10,
//     k_MAX_EVENT_TEXT       = 80
// };
//
// struct MyEvent {
//     enum EventType {
//         e_IN_PROGRESS   = 1,
//         e_TASK_COMPLETE = 2
//     };
//
//     EventType d_type;
//     int       d_workerId;
//     int       d_eventNumber;
//     char      d_eventText[k_MAX_EVENT_TEXT];
// };
// ```
// As noted in the previous example, `bslmt::ThreadUtil::create()` spawns a new
// thread, which invokes a simple "C" function taking a `void` pointer.  In the
// previous example, we simply converted that `void` pointer into a pointer to
// `bdlcc::MultipriorityQueue<MyWorkRequest>`.
//
// In this example, however, we want to pass an additional data item.  Each
// worker thread is initialized with a unique integer value ("worker Id"),
// which identifies that thread.  We therefore create a simple `struct` that
// contains both of these values:
// ```
// struct MyWorkerData {
//     int                               d_workerId;
//     bdlcc::MultipriorityQueue<MyEvent> *d_queue;
// };
// ```
// Function `myWorker` (below) simulates a working thread by enqueuing multiple
// `MyEvent` events during execution.  In a realistic application, each
// `MyEvent` structure would likely contain different textual information.  For
// the sake of simplicity, however, our loop uses a constant value for the text
// field.  Note that various priorities are generated to illustrate the
// multi-priority aspect of this particular queue:
// ```
// void myWorker(int workerId, bdlcc::MultipriorityQueue<MyEvent> *queue)
// {
//     const int N = queue->numPriorities();
//     const int NUM_EVENTS = 5;
//     int eventNumber;    // used also to generate mixed priorities
//
//     // First push 'NUM_EVENTS' events onto 'queue' with mixed priorities.
//
//     for (eventNumber = 0; eventNumber < NUM_EVENTS; ++eventNumber) {
//         MyEvent ev = {
//             MyEvent::e_IN_PROGRESS,
//             workerId,
//             eventNumber,
//             "In-Progress Event"         // constant (for simplicity)
//         };
//         queue->pushBack(ev, eventNumber % N);       // mixed priorities
//     }
//
//     // Now push an event to end this task.
//
//     MyEvent ev = {
//         MyEvent::e_TASK_COMPLETE,
//         workerId,
//         eventNumber,
//         "Task Complete"
//     };
//     queue->pushBack(ev, N - 1);                     // lowest priority
// }
// ```
// The callback function `myWorkerThread` (below) invoked by
// `bslmt::ThreadUtil::create` takes the traditional `void` pointer.  The
// expected data is the composite structure `MyWorkerData`.  The callback
// function casts the `void` pointer to the application-specific data type and
// then uses the referenced object to construct a call to the `myWorker`
// function:
// ```
// extern "C" void *myWorkerThread(void *vWorkerPtr)
// {
//     MyWorkerData *workerPtr = (MyWorkerData *)vWorkerPtr;
//     myWorker(workerPtr->d_workerId, workerPtr->d_queue);
//     return vWorkerPtr;
// }
// ```
// For the sake of simplicity, we will implement the Observer behavior (below)
// in the main thread.  The `void` function `myObserver` starts multiple
// threads running the `myWorker` function, reads `MyEvent` values from the
// queue, and logs all messages in the order of arrival.
//
// As each `myWorker` thread terminates, it sends a `e_TASK_COMPLETE` event.
// Upon receiving this event, the `myObserver` function uses the `d_workerId`
// to find the relevant thread, and then "joins" that thread.
//
// The `myObserver` function determines when all tasks have completed simply by
// counting the number of `e_TASK_COMPLETE` messages received:
// ```
// void myObserver()
// {
//     const int k_NUM_THREADS    = 10;
//     const int k_NUM_PRIORITIES = 4;
//
//     bdlcc::MultipriorityQueue<MyEvent> queue(k_NUM_PRIORITIES);
//
//     assert(0 < k_NUM_THREADS
//         && k_NUM_THREADS <= static_cast<int>(k_MAX_CONSUMER_THREADS));
//     bslmt::ThreadUtil::Handle workerHandles[k_MAX_CONSUMER_THREADS];
//
//     // Create `k_NUM_THREADS` threads, each having a unique "worker id".
//
//     MyWorkerData workerData[k_NUM_THREADS];
//     for (int i = 0; i < k_NUM_THREADS; ++i) {
//         workerData[i].d_queue = &queue;
//         workerData[i].d_workerId = i;
//         bslmt::ThreadUtil::create(&workerHandles[i],
//                                  myWorkerThread,
//                                  &workerData[i]);
//     }
//
//     // Now print out each of the `MyEvent` values as the threads complete.
//     // This function ends after a total of `k_NUM_THREADS`
//     // `MyEvent::e_TASK_COMPLETE` events have been printed.
//
//     int nStop = 0;
//     while (nStop < k_NUM_THREADS) {
//         MyEvent ev;
//         queue.popFront(&ev);
//         bsl::cout << "[" << ev.d_workerId << "] "
//                   << ev.d_eventNumber << ". "
//                   << ev.d_eventText << bsl::endl;
//         if (MyEvent::e_TASK_COMPLETE == ev.d_type) {
//             ++nStop;
//             bslmt::ThreadUtil::join(workerHandles[ev.d_workerId]);
//         }
//     }
// }
// ```

#include <bdlscm_version.h>

#include <bdlb_bitutil.h>

#include <bdlma_concurrentpool.h>

#include <bslalg_constructorproxy.h>

#include <bslma_allocator.h>
#include <bslma_deallocatorproctor.h>
#include <bslma_default.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>

#include <bslmf_movableref.h>
#include <bslmf_nestedtraitdeclaration.h>

#include <bslmt_condition.h>
#include <bslmt_lockguard.h>
#include <bslmt_mutex.h>
#include <bslmt_threadutil.h>

#include <bsls_assert.h>
#include <bsls_atomic.h>

#include <bsl_climits.h>
#include <bsl_cstdint.h>
#include <bsl_new.h>
#include <bsl_vector.h>

#ifndef BDE_DONT_ALLOW_TRANSITIVE_INCLUDES
#include <bslalg_typetraits.h>
#endif // BDE_DONT_ALLOW_TRANSITIVE_INCLUDES

namespace BloombergLP {
namespace bdlcc {

                 // =========================================
                 // local class MultipriorityQueue_Node<TYPE>
                 // =========================================

/// This class handles storage of one item of parameterized `TYPE` as a node
/// in a linked list of items stored in a multipriority queue for a given
/// priority.  This class is not to be used from outside this component.
template <class TYPE>
class MultipriorityQueue_Node {

    // DATA
    bslalg::ConstructorProxy<TYPE>  d_item;    // object stored in node
    MultipriorityQueue_Node<TYPE>  *d_next_p;  // next node on linked list

  private:
    // NOT IMPLEMENTED
    MultipriorityQueue_Node(const MultipriorityQueue_Node&);
    MultipriorityQueue_Node& operator=(const MultipriorityQueue_Node&);

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(MultipriorityQueue_Node,
                                   bslma::UsesBslmaAllocator);

    // CREATORS

    /// Create a node containing a copy of the specified `item` and having
    /// the specified `next` pointer.  Use the specified `basicAllocator` to
    /// supply memory.  The behavior is undefined unless `basicAllocator` is
    /// non-null.  Note that `item` must be copyable and assignable.
    MultipriorityQueue_Node(const TYPE&              item,
                            bslma::Allocator        *basicAllocator);

    /// Create a node containing the value of the specified `item` and
    /// having the specified `next` pointer.  `item` is left in a valid but
    /// unspecified state.  Use the specified `basicAllocator` to supply
    /// memory.  The behavior is undefined unless `basicAllocator` is
    /// non-null.
    MultipriorityQueue_Node(bslmf::MovableRef<TYPE>  item,
                            bslma::Allocator        *basicAllocator);

    /// Destroy this node and free all memory that was allocated on its
    /// behalf, if any.
    ~MultipriorityQueue_Node();

    // MANIPULATORS

    /// Return a reference to the non-modifiable item stored in this node.
    TYPE& item();

    /// Return a reference to the modifiable pointer to the node following
    /// this node on the linked list.
    MultipriorityQueue_Node*& nextPtr();

    // ACCESSORS

    /// Return a pointer to the non-modifiable node following this node on
    /// the linked list, or 0 if this node has no successor.
    const MultipriorityQueue_Node *nextPtr() const;
};

                       // ==============================
                       // class MultipriorityQueue<TYPE>
                       // ==============================

/// This class implements a thread-enabled multipriority queue whose
/// priorities are restricted to a (small) set of contiguous `N` integer
/// values, `[ 0 .. N - 1 ]`, with 0 being the most urgent.
///
/// This class does have a notion of value, namely the sequence of
/// priority/element pairs, constrained to be in decreasing order of urgency
/// (i.e., monotonically increasing priority values).  However, no
/// value-semantic operations are implemented.  Note that elements having
/// the same priority are maintained in First-In-First-Out (FIFO) order.
/// Note that the current implementation supports up to a maximum of
/// `sizeof(int) * CHAR_BIT` priorities.
///
/// This class is implemented as a set of linked lists, one for each
/// priority.  Two vectors are used to maintain head and tail pointers for
/// the lists.
template <class TYPE>
class MultipriorityQueue {

    // PRIVATE CONSTANTS
    enum {
        k_BITS_PER_INT           = sizeof(int) * CHAR_BIT,
        k_DEFAULT_NUM_PRIORITIES = k_BITS_PER_INT,
        k_MAX_NUM_PRIORITIES     = k_BITS_PER_INT
    };

    // PRIVATE TYPES

    /// The type of the elements on the linked lists of items that are
    /// maintained for the `N` priorities handled by this multipriority
    /// queue.
    typedef MultipriorityQueue_Node<TYPE> Node;

    /// The type of the vectors of list head and tail pointers.
    typedef bsl::vector<Node *>                NodePtrVector;

    // DATA
    mutable bslmt::Mutex  d_mutex;         // used to synchronize access
                                           // (including 'const' access)

    bslmt::Condition      d_notEmptyCondition;
                                           // signaled on each push

    NodePtrVector         d_heads;         // pointers to heads of linked lists
                                           // -- one for each priority

    NodePtrVector         d_tails;         // pointers to tails of linked lists
                                           // -- one for each priority

    int                   d_notEmptyFlags; // bit mask indicating priorities
                                           // for which there is data, where
                                           // bit 0 is the lowest order bit,
                                           // representing most urgent priority

    bdlma::ConcurrentPool d_pool;          // memory pool used for node storage

    bsls::AtomicInt       d_length;        // total number of items in this
                                           // multipriority queue

    bool                  d_enabledFlag;   // enabled/disabled state of pushes
                                           // to the multipriority queue (does
                                           // not affect pops)

    bslma::Allocator     *d_allocator_p;   // memory allocator (held)

  private:
    // NOT IMPLEMENTED
    MultipriorityQueue(const MultipriorityQueue&);
    MultipriorityQueue& operator=(const MultipriorityQueue&);

  private:
    // PRIVATE MANIPULATORS

    /// Attempt to remove (immediately) the least-recently added item having
    /// the most urgent priority (lowest value) from this multipriority
    /// queue.  If the specified `blockFlag` is `true`, this method blocks
    /// the calling thread until an item becomes available.  On success,
    /// load the value of the popped item into the specified `item`; if the
    /// specified `itemPriority` is non-null, load the priority of the
    /// popped item into `itemPriority`; and return 0.  Otherwise, leave
    /// `item` and `itemPriority` unmodified, and return a non-zero value
    /// indicating that this multipriority queue was empty.  The behavior is
    /// undefined unless `item` is non-null.  Note that a non-zero value can
    /// be returned only if `blockFlag` is `false`.
    int tryPopFrontImpl(TYPE *item, int *itemPriority, bool blockFlag);

  public:
    // TRAITS
    BSLMF_NESTED_TRAIT_DECLARATION(MultipriorityQueue,
                                   bslma::UsesBslmaAllocator);

    // CREATORS

    /// Create a multipriority queue.  Optionally specify `numPriorities`,
    /// the number of distinct priorities supported by the multipriority
    /// queue.  If `numPriorities` is not specified, the
    /// (implementation-imposed maximum) number 32 is used.  Optionally
    /// specify a `basicAllocator` used to supply memory.  If
    /// `basicAllocator` is 0, the currently installed default allocator is
    /// used.  The behavior is undefined unless `1 <= numPriorities <= 32`
    /// (if specified).
    explicit MultipriorityQueue(bslma::Allocator *basicAllocator = 0);
    explicit MultipriorityQueue(int               numPriorities,
                                bslma::Allocator *basicAllocator = 0);

    /// Destroy this container.  The behavior is undefined unless all access
    /// or modification of the container has completed prior to this call.
    ~MultipriorityQueue();

    // MANIPULATORS

    /// Remove the least-recently added item having the most urgent priority
    /// (lowest value) from this multi-priority queue and load its value
    /// into the specified `item`.  If this queue is empty, this method
    /// blocks the calling thread until an item becomes available.  If the
    /// optionally specified `itemPriority` is non-null, load the priority
    /// of the popped item into `itemPriority`.  The behavior is undefined
    /// unless `item` is non-null.  Note this is unaffected by the enabled /
    /// disabled state of the queue.
    void popFront(TYPE *item, int *itemPriority = 0);

    /// Insert the value of the specified `item` with the specified
    /// `itemPriority` into this multipriority queue before any queued items
    /// having a less urgent priority (higher value) than `itemPriority`,
    /// and after any items having the same or more urgent priority (lower
    /// value) than `itemPriority`.  If the multipriority queue is enabled,
    /// the push succeeds and `0` is returned, otherwise the push fails, the
    /// queue remains unchanged, and a nonzero value is returned.  The
    /// behavior is undefined unless `0 <= itemPriority < numPriorities()`.
    int pushBack(const TYPE& item, int itemPriority);

    /// Insert the value of the specified `item` with the specified
    /// `itemPriority` into this multipriority queue before any queued items
    /// having a less urgent priority (higher value) than `itemPriority`,
    /// and after any items having the same or more urgent priority (lower
    /// value) than `itemPriority`.  `item` is left in a valid but
    /// unspecified state.  If the multipriority queue is enabled, the push
    /// succeeds and `0` is returned, otherwise the push fails, the queue
    /// remains unchanged, and a nonzero value is returned.  The behavior is
    /// undefined unless `0 <= itemPriority < numPriorities()`.
    int pushBack(bslmf::MovableRef<TYPE> item, int itemPriority);

    /// Insert the value of the specified `item` with the specified
    /// `itemPriority` onto the back of this multipriority queue before any
    /// queued items having a less urgent priority (higher value) than
    /// `itemPriority`, and after any items having the same or more urgent
    /// priority (lower value) than `itemPriority`.  All of the specified
    /// `numItems` items are pushed as a single atomic action, unless the
    /// copy constructor for one of them throws an exception, in which case
    /// a possibly empty subset of the pushes will have completed and no
    /// memory will be leaked.  `Raw` means that the push will succeed even
    /// if the multipriority queue is disabled.  Note that this method is
    /// targeted for specific use by the class
    /// `bdlmt::MultipriorityThreadPool`.  The behavior is undefined unless
    /// `0 <= itemPriority < numPriorities()`.
    void pushBackMultipleRaw(const TYPE& item, int itemPriority, int numItems);

    /// Insert the value of the specified `item` with the specified
    /// `itemPriority` into the front of this multipriority queue the
    /// specified `numItems` times, before any queued items having the same
    /// or less urgent priority (higher value) than `itemPriority`, and
    /// after any items having more urgent priority (lower value) than
    /// `itemPriority`.  All `numItems` items are pushed as a single atomic
    /// action, unless the copy constructor throws while creating one of
    /// them, in which case a possibly empty subset of the pushes will have
    /// completed and no memory will be leaked.  `Raw` means that the push
    /// will succeed even if the multipriority queue is disabled.  The
    /// behavior is undefined unless `0 <= itemPriority < numPriorities()`.
    /// Note that this method is targeted at specific uses by the class
    /// `bdlmt::MultipriorityThreadPool`.
    void pushFrontMultipleRaw(const TYPE& item,
                              int         itemPriority,
                              int         numItems);

    /// Attempt to remove (immediately) the least-recently added item having
    /// the most urgent priority (lowest value) from this multi-priority
    /// queue.  On success, load the value of the popped item into the
    /// specified `item`; if the optionally specified `itemPriority` is
    /// non-null, load the priority of the popped item into `itemPriority`;
    /// and return 0.  Otherwise, leave `item` and `itemPriority`
    /// unmodified, and return a non-zero value indicating that this queue
    /// was empty.  The behavior is undefined unless `item` is non-null.
    /// Note this is unaffected by the enabled / disabled state of the
    /// queue.
    int tryPopFront(TYPE *item, int *itemPriority = 0);

    /// Remove and destroy all items from this multi-priority queue.
    void removeAll();

    /// Enable pushes to this multipriority queue.  This method has no
    /// effect unless the queue was disabled.
    void enable();

    /// Disable pushes to this multipriority queue.  This method has no
    /// effect unless the queue was enabled.
    void disable();

    // ACCESSORS

    /// Return the number of distinct priorities (indicated at construction)
    /// that are supported by this multi-priority queue.
    int numPriorities() const;

    /// Return the total number of items in this multi-priority queue.
    int length() const;

    /// Return `true` if there are no items in this multi-priority queue,
    /// and `false` otherwise.
    bool isEmpty() const;

    /// Return `true` if this multipriority queue is enable and `false`
    /// otherwise.
    bool isEnabled() const;
};

// ============================================================================
//                            INLINE DEFINITIONS
// ============================================================================

                 // -----------------------------------------
                 // local class MultipriorityQueue_Node<TYPE>
                 // -----------------------------------------

// CREATORS
template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>::MultipriorityQueue_Node(
                                              const TYPE&       item,
                                              bslma::Allocator *basicAllocator)
: d_item(item, basicAllocator)
, d_next_p(0)
{}

template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>::MultipriorityQueue_Node(
                                       bslmf::MovableRef<TYPE>  item,
                                       bslma::Allocator        *basicAllocator)
: d_item(bslmf::MovableRefUtil::move(item), basicAllocator)
, d_next_p(0)
{}

template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>::~MultipriorityQueue_Node()
{}

// MANIPULATORS
template <class TYPE>
inline
TYPE& MultipriorityQueue_Node<TYPE>::item()
{
    return d_item.object();
}

template <class TYPE>
inline
MultipriorityQueue_Node<TYPE>*& MultipriorityQueue_Node<TYPE>::nextPtr()
{
    return d_next_p;
}

// ACCESSORS
template <class TYPE>
inline
const MultipriorityQueue_Node<TYPE> *
                            MultipriorityQueue_Node<TYPE>::nextPtr() const
{
    return d_next_p;
}

                       // ------------------------------
                       // class MultipriorityQueue<TYPE>
                       // ------------------------------

// PRIVATE MANIPULATORS
template <class TYPE>
int MultipriorityQueue<TYPE>::tryPopFrontImpl(TYPE *item,
                                              int  *itemPriority,
                                              bool  blockFlag)
{
    enum { e_SUCCESS = 0, e_FAILURE = -1 };

    Node *condemned;
    int priority;

    BSLS_ASSERT(item);

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        while (0 == d_length) {
            // Note that if we get a spurious signal, we will check the
            // 'blockFlag' unnecessarily, but that will typically be a rare
            // occurrence.  This arrangement minimizes the time taken in the
            // case where '0 != d_length', which will typically be a frequent
            // occurrence.

            if (blockFlag) {
                d_notEmptyCondition.wait(&d_mutex);
            }
            else {
                return e_FAILURE;                                     // RETURN
            }
        }

        priority = bdlb::BitUtil::numTrailingUnsetBits(
                                               (bsl::uint32_t)d_notEmptyFlags);
        BSLS_ASSERT(priority < k_MAX_NUM_PRIORITIES);
            // verifies there is at least one priority bit set.  Note that
            // 'numTrailingUnsetBits' cannot return a negative value.

        Node *& head = d_heads[priority];
        condemned = head;

        *item = bslmf::MovableRefUtil::move(condemned->item());  // might throw

        head = head->nextPtr();
        if (0 == head) {
            // The last item with this priority was just popped.

            BSLS_ASSERT(d_tails[priority] == condemned);
            d_notEmptyFlags &= ~(1 << priority);
        }

        --d_length;
    }

    if (itemPriority) {
        *itemPriority = priority;
    }

    condemned->~Node();
    d_pool.deallocate(condemned);

    return e_SUCCESS;
}

// CREATORS
template <class TYPE>
MultipriorityQueue<TYPE>::MultipriorityQueue(bslma::Allocator *basicAllocator)
: d_heads((typename NodePtrVector::size_type)k_DEFAULT_NUM_PRIORITIES, 0,
          basicAllocator)
, d_tails((typename NodePtrVector::size_type)k_DEFAULT_NUM_PRIORITIES, 0,
          basicAllocator)
, d_notEmptyFlags(0)
, d_pool(sizeof(Node), bslma::Default::allocator(basicAllocator))
, d_length(0)
, d_enabledFlag(true)
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
}

template <class TYPE>
MultipriorityQueue<TYPE>::MultipriorityQueue(int               numPriorities,
                                             bslma::Allocator *basicAllocator)
: d_heads((typename NodePtrVector::size_type)numPriorities, 0, basicAllocator)
, d_tails((typename NodePtrVector::size_type)numPriorities, 0, basicAllocator)
, d_notEmptyFlags(0)
, d_pool(sizeof(Node), bslma::Default::allocator(basicAllocator))
, d_length(0)
, d_enabledFlag(true)
, d_allocator_p(bslma::Default::allocator(basicAllocator))
{
    BSLS_ASSERT(1                       <= numPriorities);
    BSLS_ASSERT(k_MAX_NUM_PRIORITIES >= numPriorities);
}

template <class TYPE>
MultipriorityQueue<TYPE>::~MultipriorityQueue()
{
    removeAll();

    typename NodePtrVector::iterator it;
    typename NodePtrVector::iterator endIt;

    for (it = d_heads.begin(), endIt = d_heads.end(); endIt != it; ++it) {
        BSLS_ASSERT(!*it);
    }

    // Tails do not get set to null by 'removeAll', so are indeterminate.

    BSLS_ASSERT(isEmpty());
    BSLS_ASSERT(0 == d_notEmptyFlags);
}

// MANIPULATORS
template <class TYPE>
inline
void MultipriorityQueue<TYPE>::popFront(TYPE *item, int *itemPriority)
{
    tryPopFrontImpl(item, itemPriority, true);
}

template <class TYPE>
int MultipriorityQueue<TYPE>::pushBack(const TYPE& item, int itemPriority)
{
    enum { e_SUCCESS = 0, e_FAILURE = -1 };

    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    // Allocate and copy construct.  Note we are doing this work outside the
    // mutex, which is advantageous in that no one is waiting on us, but it has
    // the disadvantage that we haven't checked whether this multipriority
    // queue is disabled, in which case we'll throw the new node away.

    // Note the queue being disabled is not the usual case.  Note a race
    // condition occurs if we check d_enabledFlag outside the mutex.

    Node *newNode = (Node *)d_pool.allocate();
    bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(newNode,
                                                                 &d_pool);

    ::new (newNode) Node(item, d_allocator_p);                   // might throw
    deallocator.release();
    bslma::ManagedPtr<Node> deleter(newNode, &d_pool);

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        if (!d_enabledFlag) {
            return e_FAILURE;                                         // RETURN
        }

        deleter.release();

        const int mask = 1 << itemPriority;
        if (d_notEmptyFlags & mask) {
            d_tails[itemPriority]->nextPtr() = newNode;
        }
        else {
            d_heads[itemPriority] = newNode;
            d_notEmptyFlags |= mask;
        }
        d_tails[itemPriority] = newNode;

        ++d_length;
    }

    d_notEmptyCondition.signal();

    return e_SUCCESS;
}

template <class TYPE>
int MultipriorityQueue<TYPE>::pushBack(bslmf::MovableRef<TYPE> item,
                                       int                     itemPriority)
{
    enum { e_SUCCESS = 0, e_FAILURE = -1 };

    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    // Allocate and copy construct.  Note we are doing this work outside the
    // mutex, which is advantageous in that no one is waiting on us, but it has
    // the disadvantage that we haven't checked whether this multipriority
    // queue is disabled, in which case we'll throw the new node away.
    //
    // Note the queue being disabled is not the usual case.  Note a race
    // condition occurs if we check d_enabledFlag outside the mutex.

    Node *newNode = static_cast<Node *>(d_pool.allocate());
    bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(newNode,
                                                                 &d_pool);

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        // Do the enable check before the move, since if it is a move and not a
        // copy, there's no backing out after that.

        if (!d_enabledFlag) {
            return e_FAILURE;                                         // RETURN
        }

        ::new (newNode) Node(bslmf::MovableRefUtil::move(item),  // might throw
                             d_allocator_p);
        deallocator.release();

        const int mask = 1 << itemPriority;
        if (d_notEmptyFlags & mask) {
            d_tails[itemPriority]->nextPtr() = newNode;
        }
        else {
            d_heads[itemPriority] = newNode;
            d_notEmptyFlags |= mask;
        }
        d_tails[itemPriority] = newNode;

        ++d_length;
    }

    d_notEmptyCondition.signal();

    return e_SUCCESS;
}

template <class TYPE>
void MultipriorityQueue<TYPE>::pushBackMultipleRaw(const TYPE& item,
                                                   int         itemPriority,
                                                   int         numItems)
{
    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    const int mask = 1 << itemPriority;

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        for (int ii = 0; ii < numItems; ++ii) {
            Node *newNode = (Node *)d_pool.allocate();
            bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(
                                                             newNode, &d_pool);

            ::new (newNode) Node(item, d_allocator_p);           // might throw
            deallocator.release();

            if (d_notEmptyFlags & mask) {
                d_tails[itemPriority]->nextPtr() = newNode;
            }
            else {
                d_heads[itemPriority] = newNode;
                d_notEmptyFlags |= mask;
            }
            d_tails[itemPriority] = newNode;

            ++d_length;
        }
    }

    for (int ii = 0; ii < numItems; ++ii) {
        d_notEmptyCondition.signal();
    }
}

template <class TYPE>
void MultipriorityQueue<TYPE>::pushFrontMultipleRaw(const TYPE& item,
                                                    int         itemPriority,
                                                    int         numItems)
{
    BSLS_ASSERT((unsigned)itemPriority < d_heads.size());

    const int mask = 1 << itemPriority;

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        for (int ii = 0; ii < numItems; ++ii) {
            Node *newNode = (Node *)d_pool.allocate();
            bslma::DeallocatorProctor<bdlma::ConcurrentPool> deallocator(
                                                             newNode, &d_pool);

            ::new (newNode) Node(item, d_allocator_p);           // might throw
            deallocator.release();

            Node *& head = d_heads[itemPriority];
            if (!head) {
                d_tails[itemPriority] = newNode;
                d_notEmptyFlags |= mask;
            }
            newNode->nextPtr() = head;
            head = newNode;

            ++d_length;
        }
    }

    for (int ii = 0; ii < numItems; ++ii) {
        d_notEmptyCondition.signal();
    }
}

template <class TYPE>
inline
int MultipriorityQueue<TYPE>::tryPopFront(TYPE *item, int *itemPriority)
{
    return tryPopFrontImpl(item, itemPriority, false);
}

template <class TYPE>
void MultipriorityQueue<TYPE>::removeAll()
{
    Node *condemnedList = 0;

    {
        bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

        while (d_notEmptyFlags) {
            const int priority = bdlb::BitUtil::numTrailingUnsetBits(
                                  static_cast<bsl::uint32_t>(d_notEmptyFlags));

            Node *& head = d_heads[priority];
            BSLS_ASSERT(head);

            d_tails[priority]->nextPtr() = condemnedList;
            condemnedList = head;

            head = 0;

            d_notEmptyFlags &= ~(1 << priority);
        }

        BSLS_ASSERT(0 == d_notEmptyFlags);

        d_length = 0;
    }

    Node *node = condemnedList;
    while (node) {
        Node *condemnedNode = node;
        node = node->nextPtr();

        condemnedNode->~Node();
        d_pool.deallocate(condemnedNode);
    }
}

template <class TYPE>
inline
void MultipriorityQueue<TYPE>::enable()
{
    bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

    d_enabledFlag = true;
}

template <class TYPE>
inline
void MultipriorityQueue<TYPE>::disable()
{
    bslmt::LockGuard<bslmt::Mutex> lock(&d_mutex);

    d_enabledFlag = false;
}

// ACCESSORS
template <class TYPE>
inline
int MultipriorityQueue<TYPE>::numPriorities() const
{
    return static_cast<int>(d_heads.size());
}

template <class TYPE>
inline
int MultipriorityQueue<TYPE>::length() const
{
    return d_length;
}

template <class TYPE>
inline
bool MultipriorityQueue<TYPE>::isEmpty() const
{
    return 0 == d_length;
}

template <class TYPE>
inline
bool MultipriorityQueue<TYPE>::isEnabled() const
{
    return d_enabledFlag;
}

}  // close package namespace
}  // close enterprise namespace

#endif

// ----------------------------------------------------------------------------
// Copyright 2015 Bloomberg Finance L.P.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------- END-OF-FILE ----------------------------------
